"""Note this reduce task runner is specifically made to handle heavy keys where
The data might be very large at a particular reduce worker. It doesn't write
The output to a dictionary rather produces to file where each key is on a
single line.
Conditions:
1. The key should not contain ':' in it anywhere
2. Any value in the output should not contain any newline character or a comma
3. The number of keys should be less, because each key makes up an input file

NOTE: It specifically works with codes.reducethresh.Reduce
"""

from ma.commons.core.reducetaskrunner import ReduceTaskRunner
from ma.commons.core.abstractreducer import AbstractReducer
from ma.commons.core.processrunner import ProcessRunner
import ma.commons.core.constants as constants
import ma.const
import ma.log
import ma.utils.listutils as listutils
from ma.commons.core.maptaskrunner import MapTaskRunner

import sys
import pickle
import time
import os
import traceback

# TODO: Fix the loggers ... since they are running on a separate process


class MRPlusReduceTaskRunnerHeavyKey(ReduceTaskRunner):
    
    def __init__(self, tip_info, user_module_name, user_class_name):
        
        ReduceTaskRunner.__init__(self, tip_info, user_module_name, user_class_name)
        
        self.__log = ma.log.get_logger("ma.codes")
        
    
    def startTask(self):
        """This func is called to start the user defined code as a separate 
        process
        """
        
        input_dest_dir = ma.const.JobsXmlData.get_filepath_str_data(ma.const.xml_local_input_temp_dir, self.tip_info.job_id) + os.sep
        
        temp_filename_prefix = str(self.tip_info.job_id) + str(constants.REDUCE) + str(self.tip_info.task_id) + '_' 
        compute_temp_path = ma.const.JobsXmlData.get_filepath_str_data(ma.const.xml_local_compute_temp_dir, self.tip_info.job_id) + os.sep + temp_filename_prefix
        
        list_of_keys = []
        
        # list tracking changes in keys
        list_of_not_seen_keys = []
        list_of_not_changed_keys = []
        list_of_thresholded_keys = []
        
        for input_data in self.tip_info.input_data:
            if input_data[1] == constants.MAP:
                filename = ma.const.JobsXmlData.get_str_data(ma.const.xml_map_output_filename, input_data[0], input_data[2])
            else:
                filename = ma.const.JobsXmlData.get_str_data(ma.const.xml_reduce_output_filename, input_data[0], input_data[2])
            
            # filepath for the shuffled input
            filepath = input_dest_dir + filename
            
            try:
                # self.__log.info('Input file to reduce %d: %s', self.tip_info.task_id, filepath)
                
                self.__log.info('Input file to reduce %d: %s', self.tip_info.task_id, filepath)
                
                # open file and make files for all inputs
                f = open(filepath, 'r')
                list_of_keys_input = self.writeKeysOutputFiles(f, compute_temp_path)
                f.close()
                
                self.__log.info('Input file to reduce %d read completely: %s', self.tip_info.task_id, filepath)
                
                # add all keys to our key list
                list_of_keys = listutils.list_union(list_of_keys, list_of_keys_input)
                
            except IOError as err_msg:
                self.__log.error("Error while reading reduce input or running task: %s", err_msg)
                os._exit(-1)
        
        self.__log.debug('Read all files into the dictionary for Reduce %d', self.tip_info.task_id)
        
        # builds the list of keys not encountered in the input data
        if self.tip_info.list_of_all_keys != None:
            for key in self.tip_info.list_of_all_keys:
                if key not in list_of_keys:
                    print(("------ Reduce ID %d Job ID %d -----> LIST OF NOT SEEN KEYS - %s" % (self.tip_info.task_id, self.tip_info.job_id, str(key))))
                    list_of_not_seen_keys.append(key)
                    
        self.__log.debug('List of keys that weren\'t encountered: %s', list_of_not_seen_keys)
        self.__log.info('Number of keys for reduce %d: %d', self.tip_info.task_id, len(list_of_keys))
        
        # get the required constants important for the computation
        change_margin = ma.const.JobsXmlData.get_float_data(ma.const.xml_est_reduce_margin, self.tip_info.job_id)
        min_inputs_for_change = ma.const.JobsXmlData.get_int_data(ma.const.xml_min_inputs_for_change, self.tip_info.job_id)
        threshold_value = ma.const.JobsXmlData.get_float_data(ma.const.xml_threshold_value, self.tip_info.job_id)
        input_compute_buffer = ma.const.JobsXmlData.get_int_data(ma.const.xml_reduce_inp_compute_size, self.tip_info.job_id)
        
        # iterate through all keys
        for key in list_of_keys:
            list = []
            
            # filepath containing data for this specific key
            key_file = compute_temp_path + key
            
            if_changed = [False]
            user_vars = [change_margin, if_changed, threshold_value, input_compute_buffer]
            
            # self.__log.info('Input to reduce %d function for key: %s', self.tip_info.task_id, key)
            
            th = eval( 'self.user_class.' + self.user_class_name + '(key, key_file, self.key_value_dict, user_vars)' )
            self.threads.append(th)
            
            # check if the Reducer is thresholdable
            thresholdable = th.thresholdable
            
            # start computation as thread and wait for finish
            th.start()
            th.join()
            
            # remove the computation file
            os.remove(key_file)
            
            # if the value has been reported by the reduce worker as not
            #    changes, and the number of inputs to note this change is
            #    enough, add to the list of not changed keys
            if not if_changed[0] and len(list) >= min_inputs_for_change:
                print(("------ Reduce ID %d Job ID %d -----> KEY NOT CHANGED - %s" % (self.tip_info.task_id, self.tip_info.job_id, str(key))))
                list_of_not_changed_keys.append(key)
            
            # if there is a valid threshold value and the Reducer is Thresholdable
            if threshold_value != None and thresholdable == AbstractReducer.THRESHOLDABLE:
                # if the key has been added to the dictionary by the reduce worker
                if key in self.key_value_dict:
                    final_key_value = self.key_value_dict[key][0]
                    
                    # if the key's value has crossed the threshold
                    if final_key_value > threshold_value:
                        print(("------ Reduce ID %d Job ID %d -----> KEY THRESHOLDED - %s" % (self.tip_info.task_id, self.tip_info.job_id, str(key))))
                        list_of_thresholded_keys.append((key, time.time(), self.tip_info.task_id, self.tip_info.struct_data))
        
        self.__log.debug('No. of keys out of %d those didn\'t change: %s', len(list_of_keys), len(list_of_not_changed_keys))
        
        self.__log.info('Actual processing on reduce %d ENDED', self.tip_info.task_id)
        
        if self.key_value_dict == {}:
            print('---------------------> Hey! Empty dictionary being written as reduce output', self.tip_info.task_id, self.tip_info.input_data)
        
        # write the dictionary to the output file
        try:
            #working out a reduce output file path name
            output_dest_dir = ma.const.JobsXmlData.get_filepath_str_data(ma.const.xml_local_output_temp_dir, 
                                                                        self.tip_info.job_id)
            
            output_filepath = output_dest_dir + os.sep + \
                    ma.const.JobsXmlData.get_str_data(ma.const.xml_reduce_output_filename, self.tip_info.job_id, self.tip_info.task_id)
                
            self.__log.info('Starting to write reduce %d output: %s', self.tip_info.task_id, output_filepath)
            fd = open(output_filepath, "w+")
            
            MapTaskRunner.write_keylines(fd, self.key_value_dict)
            
            fd.close()
            self.__log.info('Written reduce %d output: %s', self.tip_info.task_id, output_filepath)
            
            return [ True, list_of_not_seen_keys, list_of_not_changed_keys, list_of_thresholded_keys ]
        except IOError as err_msg:
            self.__log.error('Error while writing reduce output task-id %d, job-id %d: %s', self.tip_info.task_id, self.tip_info.job_id, err_msg)
            return [ False ] 
            

if __name__ == "__main__":
    """This function is called by the process creator to transfer information
    about the reduce task to run
    """
    
    try:
        print("Running MRPlusReduceTaskRunnerHeavyKey process pid", os.getpid())
        
        if len(sys.argv) == 5:
            user_module_name = sys.argv[1]
            user_class_name = sys.argv[2]
            job_id = sys.argv[3]
            task_id = sys.argv[4]
            #print user_module_name, user_class_name
            
            # get the filename which holds the reduce compute misc. output
            proc_temp_filename = ma.const.JobsXmlData.get_str_data(ma.const.xml_reduce_proc_temp_filename, job_id, task_id)
            
            # get the pickled list data
            list = ProcessRunner.pull_pickled_data(proc_temp_filename, job_id)
            reducetipinfo = list.__getitem__(0)
            #print 'Job ID:', maptipinfo.job_id
            #print 'Task ID:', maptipinfo.task_id
            #print 'Map or reduce:', maptipinfo.map_or_reduce
            #print 'Struct data:', maptipinfo.struct_data
            #print 'Input data:', maptipinfo.input_data
            
            task_runner = MRPlusReduceTaskRunnerHeavyKey(reducetipinfo, user_module_name, user_class_name)
            # open files, process, write output
            ret = task_runner.startTask()
            
            # print "reducetipinfo.list_of_all_keys:", reducetipinfo.list_of_all_keys
            print(("Reduce runner %d of job-id %d successful: %s" % (reducetipinfo.task_id, reducetipinfo.job_id, str(ret[0]))))
            
            if not ret[0]:
                # some error occurred
                os._exit(-1)
            else:
                # pack and pickle the lists back to the ReduceTIP
                list_of_not_seen_keys = ret[1]
                list_of_not_changed_keys = ret[2]
                list_of_thresholded_keys = ret[3]
                
                # pickle the list and write to temp file
                list = [list_of_not_seen_keys, list_of_not_changed_keys, list_of_thresholded_keys]
                proc_temp_filename = ma.const.JobsXmlData.get_str_data(ma.const.xml_reduce_proc_temp_filename, reducetipinfo.job_id, reducetipinfo.task_id)
                ProcessRunner.store_pickled_data(list, proc_temp_filename, reducetipinfo.job_id)
                    
                # ran fine
                os._exit(0)
        else:
            print('Wrong number of arguments for ReduceTaskRunner process')
            os._exit(-1)
            
    except Exception as err_msg:
        traceback.print_exception()
        print("MRPlusReduceTaskRunnerHeavyKey:__main__ Exception in running reduce: %s" % err_msg)
        os._exit(-1)
